1 /*
2 * Copyright (C) 2007 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.eventbus;
18
19 import static com.google.common.base.Preconditions.checkNotNull;
20
21 import com.google.common.annotations.Beta;
22
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25
26 /**
27 * An {@link EventBus} that takes the Executor of your choice and uses it to
28 * dispatch events, allowing dispatch to occur asynchronously.
29 *
30 * @author Cliff Biffle
31 * @since 10.0
32 */
33 @Beta
34 public class AsyncEventBus extends EventBus {
35 private final Executor executor;
36
37 /** the queue of events is shared across all threads */
38 private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch =
39 new ConcurrentLinkedQueue<EventWithSubscriber>();
40
41 /**
42 * Creates a new AsyncEventBus that will use {@code executor} to dispatch
43 * events. Assigns {@code identifier} as the bus's name for logging purposes.
44 *
45 * @param identifier short name for the bus, for logging purposes.
46 * @param executor Executor to use to dispatch events. It is the caller's
47 * responsibility to shut down the executor after the last event has
48 * been posted to this event bus.
49 */
50 public AsyncEventBus(String identifier, Executor executor) {
51 super(identifier);
52 this.executor = checkNotNull(executor);
53 }
54
55 /**
56 * Creates a new AsyncEventBus that will use {@code executor} to dispatch
57 * events.
58 *
59 * @param executor Executor to use to dispatch events. It is the caller's
60 * responsibility to shut down the executor after the last event has
61 * been posted to this event bus.
62 * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
63 * See {@link SubscriberExceptionHandler} for more information.
64 * @since 16.0
65 */
66 public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
67 super(subscriberExceptionHandler);
68 this.executor = checkNotNull(executor);
69 }
70
71 /**
72 * Creates a new AsyncEventBus that will use {@code executor} to dispatch
73 * events.
74 *
75 * @param executor Executor to use to dispatch events. It is the caller's
76 * responsibility to shut down the executor after the last event has
77 * been posted to this event bus.
78 */
79 public AsyncEventBus(Executor executor) {
80 super("default");
81 this.executor = checkNotNull(executor);
82 }
83
84 @Override
85 void enqueueEvent(Object event, EventSubscriber subscriber) {
86 eventsToDispatch.offer(new EventWithSubscriber(event, subscriber));
87 }
88
89 /**
90 * Dispatch {@code events} in the order they were posted, regardless of
91 * the posting thread.
92 */
93 @SuppressWarnings("deprecation") // only deprecated for external subclasses
94 @Override
95 protected void dispatchQueuedEvents() {
96 while (true) {
97 EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll();
98 if (eventWithSubscriber == null) {
99 break;
100 }
101
102 dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
103 }
104 }
105
106 /**
107 * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}.
108 */
109 @Override
110 void dispatch(final Object event, final EventSubscriber subscriber) {
111 checkNotNull(event);
112 checkNotNull(subscriber);
113 executor.execute(
114 new Runnable() {
115 @Override
116 public void run() {
117 AsyncEventBus.super.dispatch(event, subscriber);
118 }
119 });
120 }
121 }